Notebook pre steps¶
#@title Installations
# ALWAYS SAVE YOUR OWN COPY OF THIS NOTEBOOK: File > Save a copy in Drive
# IF DANISH MENU DO: Hjælp > Engelsk version
# To clear output do: Edit > Clear all outputs
## Useful shortscuts
# Run current cell: Cmd+Enter
# Run current cell and goto next: Shift+Enter
# Run selection (or line if no selection): Cmd+Shift+Enter
# install missing packages
!pip install dfply
!python --version
# !pip install plotly --quiet
import plotly.io as pio
pio.renderers.default = "colab"
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from dfply import *
from plotnine import *
import matplotlib.pyplot as plt
import numpy as np # RNG and vector ops
import pandas as pd # tabular outputs
import math
import random
from IPython.display import display, Markdown
from tqdm import tqdm
# import json
# from pathlib import Path
On-policy Control with approximation¶
In the previous module, the focus was on predicting the state values of a policy using function approximation. Here, the emphasis is on control, specifically finding an optimal policy through function approximation of action values $ \hat q(s, a, \textbf{w})$. Once again, the focus remains on on-policy methods.
In the episodic case, the extension from prediction to control is straightforward, but in the continuing case, discounting is not suitable to find an optimal policy. Surprisingly, once we have a genuine function approximation, we have to give up discounting and switch to an “average-reward” formulation.
Episodic Semi-gradient Control¶
Consider episodic tasks. To find a good policy, the action-value function is approximated by a differentiable function $\hat q(s,a,\mathbf{w}) \approx q^\pi(s,a)$ with weight vector $\mathbf{w}$.
Training examples now take the form $(S_t, A_t) \mapsto U_t$, where $U_t$ is any target approximating $q^\pi(S_t,A_t)$. For instance, for a one-step semi-gradient, we have
$$
U_t = R_{t+1} + \gamma\, \hat q(S_{t+1}, A_{t+1}, \mathbf{w}_t),
$$
which bootstraps from the next state–action estimate produced by the same $\varepsilon$-greedy policy. A $n$-step extension could also be used where the target accumulates the next $n$ rewards before bootstrapping:
$$
U_t = \sum_{k=1}^{n} \gamma^{k-1} R_{t+k}
\;+\;
\gamma^{n}\, \hat q(S_{t+n}, A_{t+n}, \mathbf{w}_{t+n-1}).
$$
This provides a spectrum between Monte Carlo (large $n$) and one-step bootstrapping (small $n$).
Learning is done using semi-gradient stochastic gradient descent on the squared error, yielding the generic update $$ \mathbf{w}_{t+1} = \mathbf{w}_t + \alpha\big[U_t - \hat q(S_t,A_t,\mathbf{w}_t)\big]\nabla_{\mathbf{w}} \hat q(S_t,A_t,\mathbf{w}_t). $$ This is the direct action-value analogue of semi-gradient TD for state values.
To turn prediction into control, techniques for policy improvement and action selection are needed. If the action set is discrete and not too large, then we can use the techniques already developed in previous chapters. For instance, exploration using an $\varepsilon$-greedy policy and update action values using Sarsa-style targets.
Let us implement the episodic Semi-Gradient Sarsa algorithm (Sutton et al. p244):

#@title Episodic semi-gradient SARSA
def ep_semi_grad_sarsa(
env,
episodes,
gamma,
eps,
q_hat,
s_start=None,
callback=None,
trace=None,
max_steps=None,
eps_decay=None # e.g., 0.995 or a callable: lambda ep, eps: ...
):
"""
Episodic Semi-Gradient SARSA.
Params:
env: has get_actions(s), get_step(s, a), reset()
episodes: number of episodes
gamma: discount factor in [0, 1]
eps: initial epsilon for epsilon-greedy
q_hat: approximator with:
eval(s, a) -> float
train(s, a, target) -> None (performs semi-gradient update internally)
s_start: None -> use env.reset(); else either a single start state or
a sequence to sample from.
callback(ep, q_hat, trace, info): called at end of each episode (1-based ep)
trace: passed through to callback
max_steps: optional cap on steps per episode
eps_decay: optional epsilon schedule. If float in (0,1), eps *= eps_decay each ep.
If callable, eps = eps_decay(ep, eps).
"""
def argmax_rand(values):
"""Index of a max value, breaking ties uniformly at random."""
vmax = np.max(values)
idxs = np.flatnonzero(values == vmax)
return np.random.choice(idxs)
def policy(s, eps_curr):
"""Epsilon-greedy over env.get_actions(s) using q_hat."""
actions = env.get_actions(s)
if len(actions) == 0:
raise ValueError("env.get_actions(s) returned no actions.")
if np.random.rand() < eps_curr:
return np.random.choice(actions)
q_values = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
return actions[argmax_rand(q_values)]
def pick_start(s_start):
if s_start is None:
return env.reset()
if len(s_start) == 1:
return s_start[0]
idx = np.random.choice(range(len(s_start)))
s = s_start[idx]
return s
eps_curr = float(eps)
for ep in tqdm(range(1, episodes + 1), desc="Episode", unit="episode",
bar_format='{l_bar}{bar}| {n}/{total} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'):
s = pick_start(s_start)
a = policy(s, eps_curr)
steps = 0
while True:
s_n, r, done = env.get_step(s, a)
# print("s,a,s_n,r,done:",s,a,s_n,r,done)
if done:
td_target = r # Q(s', a') = 0 for terminal
q_hat.train(s, a, td_target)
break
else:
a_n = policy(s_n, eps_curr)
td_target = r + gamma * q_hat.eval(s_n, a_n)
q_hat.train(s, a, td_target)
s, a = s_n, a_n
steps += 1
if max_steps is not None and steps >= max_steps:
# Force end of episode if a cap is set
break
if callback is not None:
info = {"epsilon": eps_curr, "steps": steps}
callback(ep, q_hat, trace, info)
# Update epsilon if requested
if eps_decay is not None:
if callable(eps_decay):
eps_curr = float(eps_decay(ep, eps_curr))
else:
eps_curr *= float(eps_decay)
eps_curr = max(0.0, min(1.0, eps_curr))
Note the following differences in the implementation:
- $\hat{q}$ is a black box trainable function approximator class so avoid passing around $\mathbf{w}$, computing and storing $\nabla\hat{q}(S,A,\mathbf{w})$ explicitly.
- Don't initialize $\mathbf{w}$ since we may want to continue training.
- Possible to choose start states.
- May use a decay for the epsilon policy.
- Add callback and trace params, so we can track what is going on.
Seasonal inventory and sales planning¶
Let us solve the seasonal inventory and sales planning problem in Example 9.4.4
We consider seasonal product such as garden furnitures. Assume that the maximum inventory level is $Q$ items, i.e. we can buy at most $Q$ items at the start of the season for a fixed price. The product can be sold for at most $T$ weeks and at the end of the period (week $T$), the remaining inventory is sold to an outlet store for as scrap price.
Let $s = (q,t)$ denote the state of the system at the start of a week. We have a terminal state (inventory empty). Actions are which price to choose. Let us limit to actions $\{10, 15, 20, 25\}$,
The inventory dynamics and rewards are coded in the environment below:
#@title RL environment - Seasonal
from __future__ import annotations # forward refs
import numpy as np
import pandas as pd
class RLEnvSeasonal:
"""
Seasonal single-item pricing/sales environment.
State space:
- non-terminal states are strings "q,t" where q∈{1..max_inv}, t∈{1..max_t}
- terminal state is "0"
Action space:
- at t < max_t: choose a price from self.prices (as string)
- at t == max_t: action is effectively a dummy (we scrap remaining inventory)
Stochastic demand depends on price via a piecewise curve and has early-season uplift.
"""
def __init__(self,
max_inv: int,
max_t: int,
scrap_price: float,
purchase_price: float,
prices: list[float],
seed = None) -> None:
"""
Initializes the seasonal RL environment.
Args:
max_inv: Maximum inventory units.
max_t: Number of selling weeks.
scrap_price: Value per leftover at final week.
purchase_price: Unit purchase cost at t=1.
prices: Allowable sale prices.
"""
self.max_inv = int(max_inv)
self.max_t = int(max_t)
self.scrap_price = float(scrap_price)
self.purchase_price = float(purchase_price)
self.prices = list(map(float, prices))
self.rng = np.random.default_rng(seed)
self.rng_seed = seed
def reset(self):
"""
Reset the environment to the initial state.
Returns:
The initial state.
"""
t = 1 # start at time 1
q = self.rng.integers(1, self.max_inv + 1)
return [int(q), t]
def reset_rng(self):
"""
Reset the random number generator.
Returns:
The initial state.
"""
self.rng = np.random.default_rng(self.rng_seed)
return self.reset()
# ----------------------------- state & action spaces ----------------------
# def get_states(self) -> list[str]:
# """
# Return all state keys as strings plus terminal '0'.
# Returns:
# A list of state identifiers.
# """
# # Cartesian product of q=1..max_inv and t=1..max_t
# grid = pd.MultiIndex.from_product(
# [range(1, self.max_inv + 1), range(1, self.max_t + 1)],
# names=["q", "t"],
# ).to_frame(index=False)
# states = [f"{int(row.q)},{int(row.t)}" for row in grid.itertuples(index=False)]
# return states + ["0"]
def get_actions(self, s) -> list:
"""
Return available actions for state s.
Args:
s: The state.
Returns:
A list of actions for the given state.
"""
assert len(s) == 2
q, t = s[0], s[1]
assert np.isscalar(t)
assert 1 <= t <= self.max_t
if t == self.max_t:
return [self.scrap_price]
assert np.isscalar(q)
assert 1 <= q <= self.max_inv
return self.prices
# ----------------------------- demand model --------------------------------
def get_demand(self, price: float, t: int) -> int:
"""
Sample a stochastic demand for a given price and week.
Piecewise base demand:
- linear between (10,20) and (12,12)
- linear between (12,12) and (15,10)
- log tail beyond 15
Early-season uplift for t <= max_t/2.
Args:
price: The price.
t: The week.
Returns:
The sampled demand as an integer.
"""
# l1: between price 10..12
# l2: between price 12..15
# l3: beyond 15, log decay anchored at (15,10)
if price <= 12.0:
a = (20.0 - 12.0) / (10.0 - 12.0)
b = 20.0 - a * 10.0
d = a * price + b
d_s = d * self.rng.uniform(0.75, 1.25)
elif 12.0 <= price <= 15.0:
a = (12.0 - 10.0) / (12.0 - 15.0)
b = 12.0 - a * 12.0
d = a * price + b
d_s = d * self.rng.uniform(0.75, 1.25)
else:
d = -4.0 * np.log(price - 15.0 + 1.0) + 10.0
d_s = d * self.rng.uniform(1.0, 2.0)
if t <= self.max_t / 2.0:
d_s *= self.rng.uniform(1.0, 1.2)
return int(round(max(0.0, d_s)))
# ----------------------------- single-step API for RLAgent -----------------
def get_step(self, s, a: float) -> list:
"""
One simulated step: given (s,a) -> (s_n, a_n, terminal)
Args:
s: The current state (list).
a: The action taken (scalar).
Returns:
The next state, reward, and terminal flag.
"""
assert np.isscalar(a)
assert len(s) == 2
q, t = s[0], s[1]
assert np.isscalar(q)
assert np.isscalar(t)
if q == 0:
return (None, 0.0, True)
if t == self.max_t:
r = self.scrap_price * q
return (None, r, True)
price = float(a)
d = self.get_demand(price, t)
sold = min(q, d)
r = price * sold
if t == 1:
r -= q * self.purchase_price
q_n = q - sold
if q_n == 0:
return (None, r, True)
return ([q_n, t+1], r, False)
We define a method get_step for getting the next reward and state in an episode. Moreover, a boolean is returned, which is true if we reach the terminal state. Note, here we have no information about transistion probabilities, possible states etc.
Let us have a deeper view of the environment:
# define candidate sale prices
prices = [10, 15, 20, 25]
## Test 1
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=4, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed=876
)
# Try to pick some state and actions
print("Start episode:")
s = [100,1]
print("Start state:", s)
for _ in range(100):
actions = env.get_actions(s)
print("Possible actions", actions)
a = env.rng.choice(actions)
print("Pick price:", a)
s_n, r, done = env.get_step(s, a)
print("Next state, reward, done:", s_n, r, done)
s = s_n
if done:
break
## Test 2
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=15, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed=876
)
# Try to pick some state and actions
print("\nStart episode:")
s = env.reset()
print("Start state:", s)
for _ in range(100):
actions = env.get_actions(s)
print("Possible actions", actions)
if 25 in actions:
a = 25
else:
a = env.rng.choice(actions)
print("Pick price:", a)
s_n, r, done = env.get_step(s, a)
print("Next state, reward, done:", s_n, r, done)
s = s_n
if done:
break
Your turn¶
Explain the output.
In the first test, we start with state $(q,t)=(100,1)$, pick an action randomly and continue until no inventory is left. In the second test, we pick a state among the start states ($t=1$) set the price to 25 if possible, until we reach the time horizon.
Control using function approximation and tile coding¶
Let us try to solve the control problem using tile coding. First, we need the tile coder class (use as is):
#@title TileCoder class (use as is)
import numpy as np
import pandas as pd
from plotnine import (
ggplot, aes, geom_rect, geom_text, geom_vline,
scale_y_reverse, labs, theme_bw, theme
)
import hashlib
from typing import Iterable, Sequence, Tuple, Union
# =============================================================
# A practical TileCoder for RL (1D and ND)
# - Deterministic, evenly spaced offsets by default
# - Optional randomness via seed
# - Returns sparse active indices; optional dense/hashed encoding
# - Supports wrapping (per-dimension or global)
# - Exposes 1D attributes so existing plotting utilities work
# =============================================================
class TileCoder:
"""
A practical TileCoder for RL (1D and ND).
- Deterministic, evenly spaced offsets by default
- Optional randomness via seed
- Returns sparse active indices
- Supports wrapping (per-dimension or global)
- Exposes 1D attributes so existing plotting utilities work
"""
def __init__(
self,
n_tilings: int,
tiles_per_dim: Union[int, Sequence[int]],
ranges: Union[None, Sequence[Tuple[float, float]]] = None,
wrap: Union[bool, Sequence[bool]] = False,
seed: Union[None, int] = None,
deterministic: bool = True,
hash_size: Union[None, int] = None,
):
"""
A practical TileCoder for RL (1D and ND).
Args:
n_tilings (int): Number of tilings.
tiles_per_dim (Union[int, Sequence[int]]): Tiles per dimension.
ranges (Union[None, Sequence[Tuple[float, float]]], optional): Ranges for normalization to [0,1]. Defaults to None
wrap (Union[bool, Sequence[bool]], optional): Wrap flags per dimension. Defaults to False
seed (Union[None, int], optional): Random seed. Defaults to None
deterministic (bool, optional): Use deterministic offsets. Defaults to True
hash_size (Union[None, int], optional): Hash size for sparse encoding. Defaults to None
"""
assert n_tilings >= 1
self.n_tilings = int(n_tilings)
if isinstance(tiles_per_dim, Iterable) and not isinstance(tiles_per_dim, (str, bytes)):
self.tiles_per_dim = np.array(list(tiles_per_dim), dtype=int)
else:
self.tiles_per_dim = np.array([int(tiles_per_dim)], dtype=int)
assert np.all(self.tiles_per_dim >= 2)
self.d = int(self.tiles_per_dim.size)
# Ranges for normalization to [0,1]
if ranges is None:
self.ranges = np.array([(0.0, 1.0)] * self.d, dtype=float)
else:
assert len(ranges) == self.d
self.ranges = np.array(ranges, dtype=float)
assert np.all(self.ranges[:, 1] > self.ranges[:, 0])
# Wrap flags per dimension
if isinstance(wrap, Iterable) and not isinstance(wrap, (str, bytes)):
wrap = list(wrap)
assert len(wrap) == self.d
self.wrap = np.array(wrap, dtype=bool)
else:
self.wrap = np.array([bool(wrap)] * self.d, dtype=bool)
self.hash_size = None if hash_size is None else int(hash_size)
self.deterministic = bool(deterministic)
self.seed = seed
# Precompute strides for mixed-radix indexing
self.tiles_strides = np.ones(self.d, dtype=int)
for i in range(self.d - 2, -1, -1):
self.tiles_strides[i] = self.tiles_strides[i + 1] * self.tiles_per_dim[i + 1]
self.tiles_per_tiling = int(np.prod(self.tiles_per_dim)) # for 1D this equals tiles_per_dim
# Offsets per tiling and dimension, as fractions of the unit interval
self.offsets = self._make_offsets()
# Expose attributes used by the existing 1D plotting helpers
if self.d == 1:
self.tiles_per_tiling_1d = int(self.tiles_per_dim[0])
self.offsets_1d = self.offsets[:, 0]
# Back-compat attribute names expected by the plotting code
self.tiles_per_tiling = self.tiles_per_tiling_1d
# plot helpers look for 'offsets' as 1D array; keep original 2D in _offsets2d
self._offsets2d = self.offsets
self.offsets = self.offsets_1d
@property
def n_features(self) -> int:
if self.hash_size is not None:
return self.hash_size
return int(self.n_tilings * np.prod(self.tiles_per_dim))
# ------------------------
# Offset generation
# ------------------------
def _make_offsets(self) -> np.ndarray:
"""Create an (n_tilings, d) array of offsets in [0, 1/tiles_i)."""
widths = 1.0 / self.tiles_per_dim.astype(float)
if self.deterministic:
# Evenly spaced along each dimension by fraction t/n_tilings of a bin width
t = np.arange(self.n_tilings, dtype=float).reshape(-1, 1)
base = (t / self.n_tilings) # shape (n_tilings, 1)
return (base * widths) # broadcast to (n_tilings, d)
else:
rng = np.random.default_rng(self.seed)
return rng.uniform(0.0, widths, size=(self.n_tilings, self.d))
# ------------------------
# Public API
# ------------------------
def active_indices(self, x: Union[float, Sequence[float]]) -> np.ndarray:
"""Return the global feature indices (length n_tilings) for input x.
x can be a scalar (1D) or a sequence of length d.
"""
u = self._normalize_to_unit(x) # in [0,1]^d
inds = np.empty(self.n_tilings, dtype=np.int64)
for t in range(self.n_tilings):
idxs = self._tile_indices_for_tiling(u, t)
if self.hash_size is None:
inds[t] = t * self.tiles_per_tiling + self._linearize_indices(idxs)
else:
inds[t] = self._hash_index(t, idxs)
return inds
def encode_sparse(self, x: Union[float, Sequence[float]]):
"""Return (indices, values) for sparse features with value 1.0 for each tiling."""
inds = self.active_indices(x)
vals = np.ones_like(inds, dtype=float)
return inds, vals
def encode_dense(self, x: Union[float, Sequence[float]]) -> np.ndarray:
"""Dense binary feature vector (mostly for debugging)."""
size = self.n_features
vec = np.zeros(size, dtype=float)
inds = self.active_indices(x)
vec[inds] = 1.0
return vec
# ------------------------
# Internals
# ------------------------
def _normalize_to_unit(self, x: Union[float, Sequence[float]]) -> np.ndarray:
x = np.array([x], dtype=float) if np.isscalar(x) else np.array(x, dtype=float)
assert x.size == self.d, f"Expected input of dimension {self.d}, got {x.size}"
lo = self.ranges[:, 0]
hi = self.ranges[:, 1]
u = (x - lo) / (hi - lo)
# Clamp to [0,1] to avoid numerical overflow in min/floor
return np.clip(u, 0.0, 1.0)
def _tile_indices_for_tiling(self, u: np.ndarray, t: int) -> np.ndarray:
offs = self.offsets[t]
v = u + offs
idxs = np.empty(self.d, dtype=int)
for i in range(self.d):
if self.wrap[i]:
w = v[i] % 1.0
else:
w = min(v[i], 1.0 - 1e-12)
idxs[i] = int(np.floor(w * self.tiles_per_dim[i]))
return idxs
def _linearize_indices(self, idxs: np.ndarray) -> int:
return int(np.dot(idxs, self.tiles_strides))
def _hash_index(self, t: int, idxs: np.ndarray) -> int:
# Deterministic hash of (t, idxs...)
payload = np.array([t, *idxs.tolist()], dtype=np.int64).tobytes()
h = hashlib.sha256(payload).digest()
return int.from_bytes(h[:8], 'little') % int(self.hash_size)
# ------------------------
# 1D plotting helpers as methods (plotnine)
# ------------------------
def _assert_1d(self):
assert self.d == 1, "plot_1d is only available for 1D tile coders"
def _wrap_flag_1d(self) -> bool:
return bool(self.wrap[0])
def _bin_bounds_for_tiling_1d(self, offset: float) -> list:
"""Compute (start,end) intervals for bins in one tiling.
Honors wrap setting; non-wrap lets the last tile extend to 1.0.
"""
wrap = self._wrap_flag_1d()
T = int(self.tiles_per_tiling)
width = 1.0 / T
if wrap:
starts = (np.arange(T) * width - offset) % 1.0
ends = (starts + width) % 1.0
return list(zip(starts, ends))
else:
out = []
for b in range(T):
if b < T - 1:
s = b * width - offset
e = (b + 1) * width - offset
else:
s = (T - 1) * width - offset
e = 1.0
s = max(0.0, s)
e = min(1.0, e)
out.append((s, e))
return out
def _active_tile_for_tiling_1d(self, z: float, offset: float) -> int:
wrap = self._wrap_flag_1d()
T = int(self.tiles_per_tiling)
val = (z + offset) % 1.0 if wrap else min(z + offset, 1.0 - 1e-12)
return int(np.floor(val * T))
def build_tile_df_1d(self, z: float = 0.37) -> pd.DataFrame:
self._assert_1d()
wrap = self._wrap_flag_1d()
recs = []
for t in range(self.n_tilings):
bins = self._bin_bounds_for_tiling_1d(self.offsets[t])
active_tile = self._active_tile_for_tiling_1d(z, self.offsets[t])
for b, (s, e) in enumerate(bins):
is_active = (b == active_tile)
if wrap and s > e:
recs.append(dict(
tiling=t, tile=b, xmin=0.0, xmax=e, ymin=t-0.45, ymax=t+0.45,
xcenter=(0.0 + e) / 2.0, ycenter=t, active=is_active
))
recs.append(dict(
tiling=t, tile=b, xmin=s, xmax=1.0, ymin=t-0.45, ymax=t+0.45,
xcenter=(s + 1.0) / 2.0, ycenter=t, active=is_active
))
continue
if e <= s:
continue
recs.append(dict(
tiling=t, tile=b, xmin=s, xmax=e, ymin=t-0.45, ymax=t+0.45,
xcenter=(s + e) / 2.0, ycenter=t, active=is_active
))
return pd.DataFrame(recs)
def plot_1d(self, z: float = 0.37, title: Union[None, str] = None):
"""Return a plotnine ggplot object visualizing 1D tiles and active bin per tiling."""
self._assert_1d()
df = self.build_tile_df_1d(z)
ttl = title if title else f"TileCoder (wrap={'True' if self._wrap_flag_1d() else 'False'})"
p = (
ggplot(df, aes(xmin='xmin', xmax='xmax', ymin='ymin', ymax='ymax', fill='active'))
+ geom_rect(alpha=0.35, color='black', size=0.2)
+ geom_text(aes(x='xcenter', y='ycenter', label='tile'), size=6)
+ geom_vline(xintercept=z, linetype='dashed')
+ scale_y_reverse()
+ labs(title=f"{ttl} — z={z:.2f}", x='z in [0,1]', y='Tiling index')
+ theme_bw()
+ theme(figure_size=(10, 2))
)
return p
def show_1d(self, z: float = 0.37, title: Union[None, str] = None):
"""Convenience: build and immediately render the 1D plot (if in an environment that supports .show())."""
return self.plot_1d(z, title).show()
Next, we are going to define the function approximation, i.e. $\hat q(s, a)$ that is going to be used as input argument in ep_semi_grad_sarsa. Note this class must have methods eval and train.
#@title Function approx with tiles
class FuncApproxTiles:
"""
Function approximator with tile coding for the seasonal inventory control problem.
Assume tile coder (the same tiles are used) for each discrete action.
"""
def __init__(self, env, actions, step_size, nb_tilings, nb_tiles, init_val, seed=None, deterministic=False, decay_fct=1):
"""
Args:
env: The environment.
actions: A list of possible actions, e.g [15, 20, 25].
step_size: tep size, will be adjusted for nb_tilings automatically
nb_tilings: Number of tilings per action.
nb_tiles: Number of tiles per dimension.
init_val: initial action-values q(s,a).
seed: Seed used by the tile class.
deterministic: If true use determenistic offsets.
decay_fct: Decay factor for step size given a state, i.e. use
decay_factor^num_of_visits * alpha as step size.
"""
self._n_dim = 2
self.alpha = step_size / nb_tilings
self.decay_fct = decay_fct
self._num_tilings_per_action = nb_tilings
# Ensure scrap only once
base_actions = [a for a in actions if a != env.scrap_price]
actions = [env.scrap_price] + base_actions
self.actions = actions
self.act2idx = {a: i for i, a in enumerate(actions)}
# feasible actions tile coder (use a single one = same tiles for all actions)
# note for scrap action only tiles active for t = max_t will be updated
self.tc = TileCoder(
n_tilings=nb_tilings,
tiles_per_dim=[nb_tiles, nb_tiles],
ranges=[(1.0, float(env.max_inv)), (1.0, float(env.max_t))],
wrap=[False, False],
deterministic=deterministic,
seed=seed
)
self._init_val = float(init_val)
## feature matrix (a row for each action)
self.w = np.zeros((len(actions), self.tc.n_features), dtype=np.float32) + self._init_val / nb_tilings
# a matrix with state counts
self.s_ctr = np.zeros((env.max_inv, env.max_t), dtype=int) # to store visits to s - 0 indexed
def reset_weights(self, keep_init=True):
if keep_init:
self.w[:] = self._init_val / self._num_tilings_per_action
else:
self.w.fill(0.0)
def reset_s_ctr(self):
self.s_ctr.fill(0)
def reset(self):
self.reset_weights()
self.reset_s_ctr()
def eval(self, state, action):
"""
Return Q(s, a) for a specific action.
Args:
state: The state.
action: The action.
"""
assert len(state) == self._n_dim
assert np.isscalar(action)
a = self.act2idx[action]
idx = self.tc.active_indices(state)
return float(self.w[a, idx].sum())
def train(self, state, action, target):
"""
Update Q(s, a) for a specific action using semi-gradient TD.
Args:
state: The state.
action: The action.
target: The target value.
"""
assert len(state) == self._n_dim
assert np.isscalar(action)
assert np.isscalar(target)
self.s_ctr[state[0] - 1, state[1] - 1] += 1
a = self.act2idx[action]
idx = self.tc.active_indices(state)
pred = self.w[a, idx].sum()
err = target - pred
alpha = self.alpha * (self.decay_fct ** (float(self.s_ctr[state[0] - 1, state[1] - 1]) - 1))
# alpha = self.alpha
self.w[a, idx] += alpha * err
## Testing
q_hat = FuncApproxTiles(env, actions=prices, step_size=0.1, nb_tilings=8, nb_tiles=4, init_val=0)
print("Active tiles for state [100,1]:", q_hat.tc.active_indices([100,1]))
print("Active tiles for state [10,15]:", q_hat.tc.active_indices([10,15]))
Your turn¶
How many tilings are there for each action?
We have eight tilings for each action.
How many tiles are there in each tiling?
We have $4\cdot 4 = 16$ tiles for each tiling.$
Are parameters (weights) estimated with different tilings for each action?
No, we use the same tilings for each action, i.e. each action uses the same regions, but estimates different weights using these regions.
We are now ready to estimate $q(s,a)$.
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=15, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed = 14233
)
ep_semi_grad_sarsa(env, episodes=20, gamma=1.0, eps=0.1, q_hat=q_hat)
However, this doesn't give us any information. We need a callback function to store information during the algorithm run.
First, we would like to calculate the RMSE so we load the optimal policy for comparison:
#@title MDP solution
dat_mdp = pd.read_csv('https://drive.google.com/uc?id=1dY6r4xKFwIv3DBEHiOnehbiVEGAhqQax', index_col = False)
dat_mdp
## store as array for later use when calc RMSE
v_mdp_grid = dat_mdp.pivot(index='q', columns='t', values='v').values
vmin, vmax = np.nanmin(v_mdp_grid), np.nanmax(v_mdp_grid) # min and max value
vrange = vmax - vmin if vmax > vmin else 1.0 # range if zero set to 1 to avoid division by zero
Next, we define the callback function and other helpers:
def callback(ep, func_approx, trace, info, eval_every=50):
"""
Callback for episodic RL that computes RMSE and normalized RMSE vs. MDP optimal values.
Requires global: env, v_mdp_grid, vrange
"""
if ep == 1 or ep % eval_every == 0:
# Evaluate learned Q(s,a) → V(s) = max_a Q(s,a)
q_grid, greedy_actions = eval_state_action_space(func_approx, env)
# rmse's
diff = (q_grid - v_mdp_grid)
diff2 = diff ** 2
rmse = np.sqrt(np.mean(diff2))
nrmse = rmse / vrange
w = func_approx.s_ctr.astype(float)
wsum = w.sum()
wrmse = np.sqrt((w * diff2).sum() / wsum)
val = {
'ep': ep,
'q': q_grid,
'a': greedy_actions,
'vt1': q_grid[:, 0],
'rmse': float(rmse),
'nrmse': float(nrmse),
'wrmse': float(wrmse),
'epsilon': info.get('epsilon'),
'steps': info.get('steps'),
}
trace.append(val)
def eval_state_action_space(q_hat, env):
"""
Evaluate Q(s,a) over all non-terminal states s=[q,t] with q=1..max_inv, t=1..max_t
and all feasible actions from env.get_actions(s).
Args:
q_hat: Function approximator with method: q_hat.eval(state, action) -> float
env: Environment with members env.max_inv, env.max_t, env.get_actions(state)
Returns:
q_grid: np.ndarray shape (max_inv, max_t) with max_a Q(s,a)
greedy: pd.DataFrame with columns ['q','t','a'] for argmax_a Q(s,a)
"""
rows = []
for q in range(1, env.max_inv + 1):
for t in range(1, env.max_t + 1):
s = [q, t]
for a in env.get_actions(s): # includes [scrap_price] at t==max_t
rows.append((q, t, float(a), q_hat.eval(s, a)))
dat = pd.DataFrame(rows, columns=['q', 't', 'a', 'q_hat'])
# Greedy action per state
idx = dat.groupby(['q', 't'])['q_hat'].idxmax()
greedy = dat.loc[idx, ['q', 't', 'a']].reset_index(drop=True)
# Max-Q grid (q as rows, t as columns)
q_max = dat.groupby(['q', 't'])['q_hat'].max().reset_index()
q_grid = q_max.pivot(index='q', columns='t', values='q_hat').values
return q_grid, greedy
# Testing (keeping the original testing code)
q_hat.reset() # Assuming q_hat is defined in the global scope
trace = []
ep_semi_grad_sarsa(env, episodes=100, gamma=1.0, eps=0.1, q_hat=q_hat, s_start=[[65,1]], callback=callback, trace=trace)
print(trace[2])
Your turn¶
Expain the output stored in trace. What is the effect of argument s_start=[[65,1]] to the algorithm?
Trace stores in each entry:
- episode number,
- all the q-values (2D array 0 index based [q-1,t-1]),
- the greedy action (best q-value) for a state (dataframe),
- the state-value for the greedy action at time 1 (1D array 0 index based [q-1]),
- the root mean squared error (RMSE) compared to the MDP,
- the normalised RMSE (interpreted numbers as percentages),
- the weighted RMSE (weights are visits to state, i.e. ignore states not visited),
- epsilon used and,
- steps used (length of episode).
All episodes start with state $(65,1)$.
We can now plot the information in trace:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from plotnine import ggplot, aes, geom_point, theme
def plot_trace(trace, dat_mdp):
"""
Plot the max-Q surface per snapshot in `trace`,
plus an optional line plot of V(q, t=1) at `start_state`.
"""
num_3d = len(trace)
num_plots = num_3d + 1
# last trace and optimal q-values
plot_two_snapshots_plotly(trace, dat_mdp, env)
# last trace and optimal actions
dat_a = trace[-1]['a'].copy() # DataFrame with ['q','t','a']
dat_a['alg'] = 'approx'
dat_a = dat_a >> bind_rows(dat_mdp >> rename(a = X.action) >> select(~X.v) >> mutate(alg = 'mdp'))
dat_a['a'] = pd.Categorical(dat_a['a'])
pt = (
ggplot(dat_a, aes(x = "t", y = "q", color = "a"))
+ geom_point()
+ facet_wrap('alg')
+ theme(figure_size=(10, 5), legend_position='bottom')
+ labs(title = "Actions")
)
pt.show()
# RMSEs
plot_rmse(trace)
# visited states
rows, cols = q_hat.s_ctr.shape
row_indices, col_indices = np.meshgrid(np.arange(rows), np.arange(cols), indexing='ij')
row_indices_flat = row_indices.flatten()
col_indices_flat = col_indices.flatten()
flat = q_hat.s_ctr.flatten()
dat_s = pd.DataFrame({
'q': row_indices_flat+1,
't': col_indices_flat+1,
'n': flat
})
dat_s = dat_s >> mask(X.n > 0)
pt = (
ggplot(dat_s, aes(x = 't', y = 'q', size = 'n'))
+ geom_point(alpha = 0.5)
+ theme(figure_size=(10,5), legend_position='bottom')
+ labs(title = "Visited states")
)
pt.show()
def plot_rmse(trace):
ep = [t['ep'] for t in trace]
rmse = [t['rmse'] for t in trace]
nrmse = [t['nrmse'] for t in trace]
wrmse = [t['wrmse'] for t in trace]
dat = pd.DataFrame({'ep': ep, 'rmse': rmse, 'measure': 'rmse'})
dat = dat >> bind_rows(pd.DataFrame({'ep': ep, 'rmse': nrmse, 'measure': 'nrmse'}))
dat = dat >> bind_rows(pd.DataFrame({'ep': ep, 'rmse': wrmse, 'measure': 'wrmse'}))
pt = (
ggplot(dat, aes(x = 'ep', y = 'rmse'))
+ geom_line()
+ facet_wrap('measure', scales="free")
+ theme(figure_size=(10,5))
)
pt.show()
# def plot_trace_old(trace, start_state=None, n_rows=2):
# """
# Plot the max-Q surface per snapshot in `trace`,
# plus an optional line plot of V(q, t=1) at `start_state`.
# """
# if not trace:
# print("Trace is empty, no trace plots to show.")
# return
# add_panel = 1 if start_state is not None else 0
# num_3d = len(trace)
# num_plots = num_3d + 1 + add_panel
# n_cols = int(np.ceil(num_plots / n_rows))
# fig = plt.figure(figsize=(n_cols * 6, n_rows * 6))
# # 3D surfaces per snapshot
# for i, snap in enumerate(trace):
# row_idx = i // n_cols
# col_idx = i % n_cols
# subplot_idx = row_idx * n_cols + col_idx + 1
# ax3d = fig.add_subplot(n_rows, n_cols, subplot_idx, projection='3d')
# plot_q_max_3d(
# snap['q'], env,
# title=f"Episode {snap['ep']}",
# labels=['Inventory (q)', 'Time (t)', 'Max Q-value'],
# alpha=.9,
# axis=ax3d
# )
# ## optimal values
# tmp = dat_mdp.pivot(index='q', columns='t', values='v').values
# subplot_idx += 1
# ax3d = fig.add_subplot(n_rows, n_cols, subplot_idx, projection='3d')
# plot_q_max_3d(
# tmp, env,
# title="Optimal q-values (MDP)",
# labels=['Inventory (q)', 'Time (t)', 'Max Q-value'],
# alpha=.9,
# axis=ax3d
# )
# # Optional: value at a specific starting state across episodes
# if start_state is not None:
# q_idx = int(start_state[0]) - 1
# # guard
# if q_idx < 0 or q_idx >= trace[0]['q'].shape[0]:
# raise ValueError(f"start_state q out of range: {start_state[0]}")
# v_pts = [(snap['ep'], snap['q'][q_idx, 0]) for snap in trace]
# # next subplot slot
# subplot_idx = num_3d + 2
# ax_line = fig.add_subplot(n_rows, n_cols, subplot_idx)
# ax_line.plot([ep for ep, _ in v_pts], [v for _, v in v_pts], marker='o')
# ax_line.set_xlabel("Episode")
# ax_line.set_ylabel(f"V(q={start_state[0]}, t=1)")
# ax_line.set_title(f"State value over learning")
# fig.subplots_adjust(wspace=0.4, hspace=0.4)
# plt.show()
# # 2D scatter of greedy actions for the latest snapshot
# dat_a = trace[-1]['a'].copy() # DataFrame with ['q','t','a']
# dat_a['a'] = pd.Categorical(dat_a['a'])
# pt = ggplot(dat_a, aes(x='t', y='q', color='a')) + geom_point() + theme(figure_size=(10, 5), legend_position='bottom') + labs(title = "Best actions estimated")
# pt.show()
# ## optimal actions
# dat_mdp['action'] = pd.Categorical(dat_mdp['action'])
# pt_mdp = (
# ggplot(dat_mdp, aes(x = "t", y = "q", color = "action"))
# + geom_point()
# + theme(figure_size=(10, 5), legend_position='bottom')
# + labs(title = "Optimal policy (MDP)")
# )
# pt_mdp.show()
# def plot_q_3d(
# q_arr,
# env,
# alpha=1.0,
# title='Maximum Q-value per State',
# labels=('Inventory (q)', 'Time (t)', 'Max Q-value'),
# axis=None
# ):
# """
# 3D surface of Q([q,t], a) over q=1..max_inv, t=1..max_t.
# """
# # Replace NaNs with the min finite value (or 0 if all NaN)
# if np.isnan(q_arr).all():
# q_plot = np.zeros_like(q_arr)
# else:
# finite_vals = q_arr[np.isfinite(q_arr)]
# fill_val = float(finite_vals.min()) if finite_vals.size else 0.0
# q_plot = np.nan_to_num(q_arr, nan=fill_val)
# x_space = np.arange(1, env.max_inv + 1)
# y_space = np.arange(1, env.max_t + 1)
# Y, X = np.meshgrid(y_space, x_space)
# created_fig = False
# if axis is None:
# fig = plt.figure(figsize=(10, 8))
# axis = fig.add_subplot(111, projection='3d')
# created_fig = True
# surf = axis.plot_surface(X, Y, q_plot, cmap='viridis', linewidth=0, antialiased=True, alpha=alpha)
# axis.set_xlabel(labels[0])
# axis.set_ylabel(labels[1])
# axis.set_zlabel(labels[2])
# axis.set_title(title)
# # Ticks: a few neat integers
# axis.set_xticks(np.linspace(1, env.max_inv, num=min(env.max_inv, 6), dtype=int))
# axis.set_yticks(np.linspace(1, env.max_t, num=min(env.max_t, 6), dtype=int))
# if created_fig:
# plt.show()
import numpy as np
import plotly.graph_objects as go
def plot_q_3d_plotly(q_arr, env, title="Max Q-value per state"):
"""Interactive 3D surface for max Q-values using Plotly."""
# Clean up NaNs
if np.isnan(q_arr).all():
q_plot = np.zeros_like(q_arr)
else:
finite = q_arr[np.isfinite(q_arr)]
fill = float(finite.min()) if finite.size else 0.0
q_plot = np.nan_to_num(q_arr, nan=fill)
x = np.arange(1, env.max_inv + 1) # inventory
y = np.arange(1, env.max_t + 1) # time
Y, X = np.meshgrid(y, x)
fig = go.Figure(
data=[go.Surface(
x=X, y=Y, z=q_plot,
colorscale="Viridis",
colorbar=dict(title="Q"),
showscale=True,
)]
)
fig.update_layout(
title=title,
scene=dict(
xaxis_title="Inventory (q)",
yaxis_title="Time (t)",
zaxis_title="Max Q-value"
),
width=800,
height=600,
)
fig.show()
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np
def plot_two_snapshots_plotly(trace, dat_mdp, env, id=-1):
"""
Plot the a Q-surface of a snapshot in `trace`,
and the optimal (MDP) Q-surface.
"""
fig = make_subplots(
rows=1, cols=2,
specs=[[{"type": "surface"}, {"type": "surface"}]],
subplot_titles=[f"Episode {trace[id]['ep']}", "Optimal (MDP)"]
)
# --- Data prep ---
q_arr = np.nan_to_num(trace[id]['q'])
tmp = dat_mdp.pivot(index='q', columns='t', values='v').values
x = np.arange(1, env.max_inv + 1)
y = np.arange(1, env.max_t + 1)
Y, X = np.meshgrid(y, x)
# --- Compute global z-range ---
zmin = np.nanmin([q_arr.min(), tmp.min()])
zmax = np.nanmax([q_arr.max(), tmp.max()])
# --- Surfaces ---
fig.add_trace(
go.Surface(x=X, y=Y, z=q_arr, colorscale="Viridis",
cmin=zmin, cmax=zmax, showscale=False),
row=1, col=1
)
fig.add_trace(
go.Surface(x=X, y=Y, z=tmp, colorscale="Viridis",
cmin=zmin, cmax=zmax, showscale=True, colorbar_title="Value"),
row=1, col=2
)
# --- Layout with shared z-axis range ---
fig.update_layout(
width=1100, height=500,
scene=dict(
xaxis_title="Inventory (q)",
yaxis_title="Time (t)",
zaxis_title="Value",
zaxis=dict(range=[zmin, zmax])
),
scene2=dict(
xaxis_title="Inventory (q)",
yaxis_title="Time (t)",
zaxis_title="Value",
zaxis=dict(range=[zmin, zmax])
),
)
fig.show()
# import numpy as np
# import plotly.graph_objects as go
# def plot_two_in_one_scene(trace, dat_mdp, env, id=-1, gap=2.0):
# """
# Show two Q-surfaces in ONE 3D scene so mouse rotation affects both.
# The second surface is shifted in y by `gap * env.max_t`.
# """
# # get data
# q1 = np.nan_to_num(trace[id]['q'])
# q2 = dat_mdp.pivot(index='q', columns='t', values='v').values
# x = np.arange(1, env.max_inv + 1)
# y = np.arange(1, env.max_t + 1)
# Y, X = np.meshgrid(y, x)
# # shift second surface in y
# shift = gap * env.max_t
# Y2 = Y + shift
# fig = go.Figure()
# fig.add_surface(
# x=X, y=Y, z=q1,
# colorscale="Viridis",
# name=f"Episode {trace[id]['ep']}",
# showscale=False,
# opacity=0.95
# )
# fig.add_surface(
# x=X, y=Y2, z=q2,
# colorscale="Plasma",
# name=f"Optimal (MDP)",
# showscale=False,
# opacity=0.95
# )
# fig.update_layout(
# title="Two Q-surfaces (rotate me!)",
# scene=dict(
# xaxis_title="Inventory (q)",
# yaxis_title="Time / surface",
# zaxis_title="Max Q",
# ),
# width=900,
# height=650
# )
# fig.show()
plot_trace(trace, dat_mdp)
Your turn¶
Explain what is plotted.
First the maximum over the estimated q-values are plotted together with the optimal state-value function (MDP). Next, the best actions estimated are plotted together with the optimal actions (MDP). Finally, the three RMSEs are plotted.
Let us put all the code into a singe code cell:
# define candidate sale prices
prices = [10, 15, 20, 25]
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=15, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed=876
)
# define function approx (step_size will be 0.4/nb_tilings)
q_hat = FuncApproxTiles(env, actions=prices, step_size=1, nb_tilings=8, nb_tiles=8, init_val=0, deterministic=True, seed=53487)
def callback1(ep, func_approx, trace, info, eval_every=1000):
callback(ep, func_approx, trace, info, eval_every)
# train
q_hat.reset_s_ctr()
q_hat.reset()
trace = []
ep_semi_grad_sarsa(
env, episodes=5000, gamma=1.0, eps=0.3, q_hat=q_hat,
# s_start=[start_state],
callback=callback1, trace=trace,
max_steps=env.max_t, # harmless safety
# eps_decay=decay_factor(0.00000000001, 0.3, 5000)
)
# plot results
plot_trace(trace, dat_mdp)
Your turn¶
Explain the output.
How good are our state-value estimates compared to optimal?
Here, we can use the first two plots. As can be seen, the shape of the state-value function is like the optimal. If we compare RMSEs, then we are still on average over 25% (nRMSE) from the optimal values, and our improvement in RMSE seems to flatten. Offcourse, for wRMSE, we have a better estimate since we don't care about unvisited states.
How good are our action estimates compared to optimal?
For unvisited states, the action is just picked as the first. For visited states, the action is close to the optimal but not always correct.
Do we visit all states during sampling?
No only approx half of the states are visited (at most about 75 times). The other states are not visited since the probability of attending such a state is almost zero.
How could we change the algorithm so visit more states during sampling?
We could just let the starting state of an episode be picked among all the states instead. This may only be a good idea if we are interested in estimating the q-value for the states that we visit with a very low probability.
Which policy have we tried to find estimates for?
Since we don't decay the epsilon value. We estimate the state-values for the eps-soft policy.
Your turn¶
Could other starting estimates of the q-values be useful? Test by running the code with your choice.
#@title Solution
env.reset_rng()
# define function approx (step_size will be 0.4/nb_tilings)
q_hat = FuncApproxTiles(env, actions=prices, step_size=1, nb_tilings=8, nb_tiles=8, init_val=900, deterministic=True, seed=53487)
# train
trace = []
ep_semi_grad_sarsa(
env, episodes=5000, gamma=1.0, eps=0.3, q_hat=q_hat,
callback=callback1, trace=trace,
max_steps=env.max_t,
)
# plot results
plot_trace(trace, dat_mdp)
display(Markdown("""
Since the optimal state-values are between 0 and 1650, we may choose a starting
value around the middle, e.g. 900. This gives us better estimates.
"""))
Currently, we estimate the eps-soft policy. However, we may use a decay to let epsilon (exploration) decrease over the episodes.
def decay_factor(goal_eps, start_eps, ep):
"""
Decay function for epsilon.
Args:
goal_eps (float): Final epsilon value wanted in the last episode.
start_eps (float): Initial epsilon value.
ep (int): Total number of episodes.
Returns:
decay factor to use to get to goal_eps in ep steps.
"""
return (goal_eps/start_eps)**(1/ep)
Your turn¶
Use the decay function to set factor that must be used as argument eps_decay to ep_semi_grad_sarsa. Note you may here start with a high epsilon and set it to decay to e.g. 0.0001 in the end.
- Did your results improve?
#@title Solution
env.reset_rng()
# define function approx (step_size will be 0.4/nb_tilings)
q_hat = FuncApproxTiles(env, actions=prices, step_size=1, nb_tilings=8, nb_tiles=8, init_val=900, deterministic=True, seed=53487)
# train
trace = []
start_eps = 1
goal_eps = 0.0001
episodes = 5000
ep_semi_grad_sarsa(
env, episodes=episodes, gamma=1.0, eps=start_eps, q_hat=q_hat,
callback=callback1, trace=trace,
max_steps=env.max_t,
eps_decay=decay_factor(goal_eps, start_eps, episodes)
)
# plot results
plot_trace(trace, dat_mdp)
display(Markdown("""
Using the above setting seems to work. Here we explore fully at the start and a
little in the end. This makes us visit more states. Note, you may have got
different results depending on your setting of hyperparameters.
"""))
Episodic semi-gradient expected SARSA¶
Using Q-learning may not work since this is an off-policy algorithm, which may diverge due to the “deadly triad” (off-policy + bootstrapping + approximation). This is true even with linear features and fixed $\epsilon$-greedy behavior; there is no general convergence guarantee.
However, we may modify the algorithm to use expected SARSA instead.
#@title Episodic semi-gradient expected SARSA
def ep_semi_grad_exp_sarsa(
env,
episodes,
gamma,
eps,
q_hat,
s_start=None,
callback=None,
trace=None,
max_steps=None,
eps_decay=None # float or callable
):
"""
Episodic semi-gradient Expected SARSA for RLEnvSeasonal-like env.
Expects:
env.reset() -> [q, t]
env.get_actions(s) -> list of feasible actions
env.get_step(s, a) -> (s_next, r, done)
q_hat.eval(s, a) -> float
q_hat.train(s, a, target) -> None
"""
def argmax_rand(values):
m = np.max(values)
idx = np.flatnonzero(values == m)
return np.random.choice(idx)
def eps_greedy_action(s, eps_curr):
actions = env.get_actions(s)
if np.random.rand() < eps_curr:
return float(np.random.choice(actions))
qvals = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
return float(actions[argmax_rand(qvals)])
def expected_q(s, eps_curr):
"""
E[ Q(s, A) ] where A ~ epsilon-greedy over actions in s.
"""
actions = env.get_actions(s)
nA = len(actions)
qvals = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
# greedy(s)
greedy_idx = argmax_rand(qvals)
greedy_q = qvals[greedy_idx]
# prob of each action under epsilon-greedy
# P(a) = eps/nA for all a, plus (1-eps) for greedy
base_p = eps_curr / nA
probs = np.full(nA, base_p, dtype=float)
probs[greedy_idx] += (1.0 - eps_curr)
# expected Q
return float(np.dot(probs, qvals))
def pick_start():
if s_start is None:
return env.reset()
try:
if isinstance(s_start, (str, bytes)):
return s_start
return list(s_start)[np.random.randint(len(s_start))]
except TypeError:
return s_start
eps_curr = float(eps)
for ep in tqdm(range(1, episodes + 1), desc="Episode", unit="episode",
bar_format='{l_bar}{bar}| {n}/{total} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'):
s = pick_start()
a = eps_greedy_action(s, eps_curr)
steps = 0
while True:
s_n, r, done = env.get_step(s, a)
if done:
# terminal target
q_hat.train(s, a, r)
break
else:
# Expected SARSA target
exp_q_next = expected_q(s_n, eps_curr)
target = r + gamma * exp_q_next
q_hat.train(s, a, target)
# next behavior action (still epsilon-greedy)
s, a = s_n, eps_greedy_action(s_n, eps_curr)
steps += 1
if max_steps is not None and steps >= max_steps:
break
# callback at the end of the episode
if callback is not None:
info = {"epsilon": eps_curr, "steps": steps}
callback(ep, q_hat, trace, info)
# optional epsilon schedule
if eps_decay is not None:
if callable(eps_decay):
eps_curr = float(eps_decay(ep, eps_curr))
else:
eps_curr = eps_curr * float(eps_decay)
eps_curr = min(1.0, max(0.0, eps_curr))
Your turn¶
- Run exp SARSA with your preferred parameters.
- Are your results better?
#@title Solution
env.reset_rng()
start_alph = 1/8 # overall alpha / nb tilings
goal_alph = 0.01
visits = 300 # approx max number of visits per state
q_hat = FuncApproxTiles(
env, actions=prices,
step_size=1,
nb_tilings=8, nb_tiles=8, init_val=900,
deterministic=True,
decay_fct=decay_factor(goal_alph, start_alph, visits)
)
# train
trace = []
start_eps = 1
goal_eps = 0.0001
episodes = 5000
ep_semi_grad_exp_sarsa(
env, episodes=episodes, gamma=1.0, eps=start_eps, q_hat=q_hat,
callback=callback1, trace=trace,
max_steps=env.max_t,
eps_decay=decay_factor(goal_eps, start_eps, episodes)
)
# plot results
plot_trace(trace, dat_mdp)
display(Markdown("""
For this run, it seems that estimates are approx the same (maybe a bit better).
However, more computation time is also needed. Note, you may also use a decay
factor for the function approximation.
"""))
Average Reward: A New Problem Setting for Continuing Tasks¶
The average-reward formulation treats continuing tasks by optimizing the long-run reward rate instead of discounted returns. The performance of a policy $\pi$ is defined as the steady-state average $$ \begin{align} r(\pi) &= \lim_{h\to\infty}\frac{1}{h}\,\mathbb{E}_\pi\!\left[\sum_{t=1}^{h} R_t\right] \\ &= \sum_s \mu_\pi(s) \sum_a \pi(a|s) \sum_{s',r} p(s',r|s,a)r \\ &= \sum_s \mu_\pi(s) \sum_a \pi(a|s) \sum_{s'} p(s'|s,a)r(s,a), \end{align} $$ This holds if the Markov chain (MDP under $\pi$) is ergodic, i.e., all states are reached with the same steady-state distribution $\mu_\pi(s)$, regardless of the starting state.
To measure preferences between states and actions without discounting, we introduce differential returns that subtract the average rate at each step: $$ G_t \doteq \sum_{k=0}^{\infty}\big(R_{t+1+k}-r(\pi)\big). $$
The differential action-value functions then becomes $$ \begin{align} q^\pi(s,a) &= \mathbb{E}_\pi[G_t\mid S_t=s, A_t=a] \\ &= \sum_{s',r} p(s',r\mid s,a)\Big(r - r(\pi) + \sum_{a'} \pi(a'\mid s')\,q^\pi(s',a')\Big). \end{align} $$ They satisfy Bellman relations analogous to the discounted case but without $\gamma$ and with rewards centered by $r(\pi)$.
Learning proceeds by replacing discounted TD errors with differential TD errors that incorporate an estimate of the average reward. With function approximation on $\hat q(s,a,w)$ and a running estimate of $\bar R_t \approx r(\pi)$, the errors become $$ \delta_t^{q} \doteq R_{t+1}-\bar R_t + \hat q(S_{t+1},A_{t+1},w_t) - \hat q(S_t,A_t,w_t). $$ The Semi-gradient update is $$ w_{t+1} \leftarrow w_t + \alpha\,\delta_t^{q}\,\nabla_w \hat q(S_t,A_t,w_t). $$
Let is how to update the average-reward estimate. This can be done incrementally with a small step size to ensure stability, e.g. $$ \bar R_{t+1} \leftarrow \bar R_t + \beta\delta_t^q. $$
Control replaces policy terms with maximization as usual, defining optimal differential values $v^*$ and $q^*$ and coupling learning with $\epsilon$-greedy improvement over $\hat q$.
Conceptually, this formulation aligns the objective with what matters in truly continuing problems (long-run reward rate), avoids artifacts from discounting, and remains compatible with multi-step, eligibility-trace, and function-approximation methods by the simple substitution of average-reward-centered TD errors.
Deprecating the Discounted Setting¶
Why are we moving from the discounted reward criterion to the avearage reward criterion?
Using the discounted reward criterion is ill-suited for truly continuing tasks once function approximation enters the picture. Note, function approximation creates bias among states. Hence, with approximation, your value function may not represent the true value function accurately.
The policy improvement theorem does not apply with function approximation in the discounted setting. In fact, the approximation errors can be amplified by the discounting, and greedy improvement is not guaranteed to improve the policy.This loss of a policy improvement theorem means discounted control lacks a firm local-improvement foundation under approximation.
Hence, the best solution is to replace discounted control in continuing tasks with the average-reward formulation and its differential value functions, which align the objective with the steady-state reward rate and integrate smoothly with semi-gradient methods.
Exercises¶
Exercise (seasonal inventory)¶
Solve the seasonal inventory and sales planning problem in Example 9.4.4.
Use a Fourier basis as function approximation to estimate the action-values.
Exercise (car rental)¶
Consider the Car rental problem. Our goad is to use function approximation to estimate the optimal state value function.
An environment is given below.
#@title Car rental environment
import numpy as np
from typing import Callable, Tuple, List, Dict
class RLEnvCar:
"""
Car rental environment (two locations) with Poisson demand/returns.
States are tuples (x, y) with x,y ∈ {0..20}.
"""
def __init__(self,
lD: List[float] = [3, 4],
lH: List[float] = [3, 2],
seed: int | None = None):
self.lD = list(lD)
self.lH = list(lH)
self.rng = np.random.default_rng(seed)
self.cars_max = 20 # max number of cars at each location
def reset(self):
"""Reset the environment."""
return (10, 10)
# ------------------------------------------------------------------
def get_states(self) -> List[Tuple[int, int]]:
"""Return all possible states as (x, y) tuples."""
return [(x, y) for x in range(21) for y in range(21)]
def get_actions(self, s: Tuple[int, int]) -> List[int]:
"""Return feasible integer actions for state (x, y)."""
x, y = s
low = -min(5, y, 20 - x)
high = min(5, x, 20 - y)
return list(range(low, high + 1))
def get_step(self, s: Tuple[int, int], a: int) -> Dict[str, float | Tuple[int, int]]:
"""
Take one step in the environment.
Args:
s: Current state (x, y)
a: Action, cars moved from 1→2 (negative = 2→1)
Returns:
tuple (next_state, reward).
"""
x, y = s
low = -min(5, y, 20 - x)
high = min(5, x, 20 - y)
if not (low <= a <= high):
raise ValueError(f"Action {a} not feasible in state {s}")
# Apply overnight move
x_bar = x - a
y_bar = y + a
# Demand and returns
d_x = int(self.rng.poisson(self.lD[0]))
d_y = int(self.rng.poisson(self.lD[1]))
h_x = int(self.rng.poisson(self.lH[0]))
h_y = int(self.rng.poisson(self.lH[1]))
# Rentals served
served_x = min(d_x, x_bar)
served_y = min(d_y, y_bar)
# Next state (capped)
x_next = min(20, x_bar - served_x + h_x)
y_next = min(20, y_bar - served_y + h_y)
reward = 10.0 * (served_x + served_y) - 2.0 * abs(a)
return (x_next, y_next), float(reward)
Let us create an instance of the environment:
env = RLEnvCar(seed=42)
print(env.get_step((10, 10), 3))
Q1¶
What is the return output of method get_step?
The output is the reward and the next state. Note that a state is now a tuple $(x, y)$ of integers and actions are integers.
Differential (average-reward) semi-gradient Expected SARSA¶
To do function approximation, we modify the episodic expected SARSA version to a continuous task version:
#@title Differential (average-reward) semi-gradient Expected SARSA
import numpy as np
from typing import Any, Callable, Optional, Tuple, Dict
def cont_diff_expected_sarsa(
env: Any,
total_steps: int,
beta: float,
eps: float = 0.1,
q_hat: Any = None,
s_start: Optional[Tuple[int, int]] = None,
callback: Optional[Callable[[int, Any, Dict[str, float]], None]] = None,
callback_every: int = 0,
trace = None,
eps_decay: Optional[Callable[[int, float], float] | float] = None,
rng: Optional[np.random.Generator] = None,
) -> float:
"""
Differential (average-reward) semi-gradient Expected SARSA for continuing tasks.
TD target:
target = (r - R_bar) + gamma * E_{a'~pi_eps}[ Q(s', a') ]
TD error:
delta = target - Q(s, a)
Policy is epsilon-greedy over feasible actions with tie-aware greedy mass
(ties share the (1 - eps) probability).
Expected interfaces:
- env.get_actions(s) -> list of feasible actions
- env.get_step(s, a) -> (s_next, r)
- q_hat.eval(s, a) -> float
- q_hat.train(s, a, target) -> None # uses its own stepsize internally
Args:
env: Environment with get_actions/get_step and (optionally) reset().
total_steps: Number of environment steps (continuing task).
beta: Stepsize for the average reward estimate R_bar.
eps: Epsilon for epsilon-greedy behavior.
q_hat: Function approximator with eval()/train().
s_start: Optional start state; if None, uses env.reset().
callback: Optional callback(step, q_hat, info) called periodically.
callback_every: Call callback every N steps (0 disables).
trace: passed through to callback
eps_decay: Float multiplier (e.g., 0.999) or callable f(step, eps)->new_eps.
rng: Optional NumPy Generator.
Returns:
float: Final estimate of the average reward R_bar.
"""
if rng is None:
rng = np.random.default_rng()
def argmax_random_tie(values: np.ndarray) -> int:
m = np.max(values)
ties = np.flatnonzero(values == m)
return int(rng.choice(ties))
def epsilon_greedy_action(s: Tuple[int, int], eps_curr: float) -> int:
actions = env.get_actions(s)
if not actions:
raise RuntimeError(f"No feasible actions in state: {s}")
if rng.random() < eps_curr:
return actions[int(rng.integers(len(actions)))]
qvals = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
return actions[argmax_random_tie(qvals)]
def expected_q_under_eps_greedy(s: Tuple[int, int], eps_curr: float) -> float:
actions = env.get_actions(s)
nA = len(actions)
if nA == 0:
return 0.0
qvals = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
greedy_val = qvals.max()
greedy_mask = (qvals == greedy_val)
base = eps_curr / nA
probs = np.full(nA, base, dtype=float)
probs[greedy_mask] += (1.0 - eps_curr) / greedy_mask.sum()
return float(probs @ qvals)
# Initialize starting state
s = env.reset() if s_start is None else s_start
# Running average reward
bar_R = 0.0
eps_curr = float(eps)
# First action
a = epsilon_greedy_action(s, eps_curr)
for step in tqdm(range(1, total_steps + 1), desc="Step", unit=" step",
bar_format='{l_bar}{bar}| {n}/{total} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'):
s_next, r = env.get_step(s, a)
# Differential Expected SARSA target and TD error
exp_q_next = expected_q_under_eps_greedy(s_next, eps_curr)
q_sa = q_hat.eval(s, a)
target = (r - bar_R) + exp_q_next
delta = target - q_sa
# Update average reward estimate first (classic differential TD style)
bar_R += beta * delta
# Semi-gradient update for Q (q_hat handles its own stepsize)
q_hat.train(s, a, target)
# Next state/action
s = s_next
a = epsilon_greedy_action(s, eps_curr)
# Optional epsilon schedule
if eps_decay is not None:
eps_curr = float(eps_decay(step + 1, eps_curr)) if callable(eps_decay) \
else float(eps_curr * eps_decay)
eps_curr = min(1.0, max(0.0, eps_curr))
# Optional callback (fire after step+1 transitions)
if (step == 0 or (callback_every and ((step) % callback_every == 0))) and (callback is not None):
info = {
"epsilon": eps_curr,
"bar_R": bar_R,
"step": step,
"beta": beta,
}
callback(q_hat, info, trace)
return float(bar_R)
Q2¶
Explain the main differences compared to the episodic version.
Here, we consider differential returns that subtract the average reward at each step, which also must be estimated. This is done using a new hyperparameter $\beta$.
What is the effect of eps_decay?
The eps used in the eps-greedy action selection is calculated as eps_curr = eps_curr * (eps_decay ** (step - 1)). That is the eps value is lowered as more steps are used. Se example below.
#@title Solution (example)
eps_curr = 0.1
eps_decay = 0.999995
for step in (1, 10, 1000, 5000, 50000, 100000, 500000):
eps_curr = eps_curr * (eps_decay ** (step - 1))
print(f'Step = {step}, eps_curr = {eps_curr}')
Function approximation with tiles¶
Let us try to estimate using tile coding.
#@title Function approx with tiles
class FuncApproxTiles:
"""
Function approximator with 2D tile coding.
Assume tile coder (the same tiles/areas are used) for each discrete action.
"""
def __init__(self, env, step_size, nb_tilings, nb_tiles, init_val, seed=None, deterministic=False, decay_fct=1):
"""
Args:
env: The environment.
step_size: Step size, will be adjusted for nb_tilings automatically
nb_tilings: Number of tilings per action.
nb_tiles: Number of tiles per dimension.
init_val: initial action-values q(s,a).
seed: Seed used by the tile class.
deterministic: If true use determenistic offsets.
decay_fct: Decay factor for step size given a state, i.e. use
decay_factor^num_of_visits * alpha - 1 as step size.
"""
self._n_dim = 2
self.alpha = step_size / nb_tilings
self.decay_fct = decay_fct
self._num_tilings_per_action = nb_tilings
states = env.get_states()
actions = list({a for s in states for a in env.get_actions(s)})
self.actions = actions
self.act2idx = {a: i for i, a in enumerate(actions)}
self.st2idx = {s: i for i, s in enumerate(states)}
# feasible actions tile coder (use a single one = same tiles for all actions)
self.tc = TileCoder(
n_tilings=nb_tilings,
tiles_per_dim=[nb_tiles, nb_tiles],
ranges=[(1.0, float(env.cars_max)), (1.0, float(env.cars_max))],
wrap=[False, False],
deterministic=deterministic,
seed=seed
)
self._init_val = float(init_val)
## feature matrix (a row for each action)
self.w = np.zeros((len(actions), self.tc.n_features), dtype=np.float32) + self._init_val / nb_tilings
self.s_ctr = np.zeros(len(states), dtype=int) # to store visits to s (use st2idx to find idx)
def reset_weights(self, keep_init=True):
if keep_init:
self.w[:] = self._init_val / self._num_tilings_per_action
else:
self.w.fill(0.0)
def reset_s_ctr(self):
self.s_ctr.fill(0)
def reset(self):
self.reset_weights()
self.reset_s_ctr()
def eval(self, state, action):
"""
Return Q(s, a) for a specific action.
Args:
state: The state.
action: The action.
"""
assert len(state) == self._n_dim
assert np.isscalar(action)
a = self.act2idx[action]
idx = self.tc.active_indices(state)
return float(self.w[a, idx].sum())
def train(self, state, action, target):
"""
Update Q(s, a) for a specific action using semi-gradient TD.
Args:
state: The state.
action: The action.
target: The target value.
"""
assert len(state) == self._n_dim
assert np.isscalar(action)
assert np.isscalar(target)
self.s_ctr[self.st2idx[state]] += 1
a = self.act2idx[action]
idx = self.tc.active_indices(state)
pred = self.w[a, idx].sum()
err = target - pred
alpha = self.alpha * (self.decay_fct ** (float(self.s_ctr[self.st2idx[state]]) - 1))
self.w[a, idx] += alpha * err
## Testing
q_hat = FuncApproxTiles(env, step_size=1, nb_tilings=8, nb_tiles=4, init_val=0, decay_fct = 0.999)
print("Active tiles for state [1,1]:", q_hat.tc.active_indices([1,1]))
print("Active tiles for state [10,15]:", q_hat.tc.active_indices([10,15]))
Q3¶
Explain the function class.
How are tiles used for each action?
The same tiles are used for each discrete action $a$. That is, the same areas overlap given an action. However, weights for each action are estimated.
Which step-size alpha is used for estimation?
The overall learning rate $a$ is given as input and the step-size for each weight becomes $\alpha = a$/nb_tilings. This ensures that the total update magnitude per input sample is stable, regardless of the number of tilings.
What is the purpose of the argument
decay_fct?It is used to decrease the step-size as the estimate becomes better. For each state the step-size is: $\alpha\cdot$decay_fct ** (num_of_visits - 1). See an example in the code cell below.
#@title Solution (example)
decay_fct = 0.999
alpha = 1/8
for i in (0,1,10,50,100,200,500):
print(f'Visits = {i+1}, step-size = {alpha * decay_fct ** i}')
Q4¶
Run differential (average-reward) semi-gradient Expected SARSA over 20 steps using code:
final_bar_R = cont_diff_expected_sarsa(
env=env,
total_steps=20,
beta=0.01, # avg-reward stepsize (try 0.005–0.02)
eps=0.1,
q_hat=q_hat,
s_start=(10, 10),
eps_decay=0.999995,
callback_every=10,
callback=lambda qhat, info, trace: print(
f"[{info['step']}] bar_R={info['bar_R']:.3f} eps={info['epsilon']:.3f}"
),
)
print("Final average reward estimate:", final_bar_R)
Explain the arguments used as input and the output.
We run learning over 20 steps and print every 10 steps info about the algorithm.
Let us define a callback that stores info about the solution and some helper functions for evaluating the approximation.
def callback(func_approx, info, trace):
"""
Callback for differential (average-reward) semi-gradient Expected SARSA.
"""
step = info['step']
# Evaluate learned Q(s,a) → V(s) = max_a Q(s,a)
q_grid, greedy_actions = eval_state_action_space(func_approx, env)
val = {
'q': q_grid,
'a': greedy_actions,
'epsilon': info.get('epsilon'),
'step': step,
'bar_R': info.get('bar_R'),
}
trace.append(val)
def eval_state_action_space(q_hat, env):
"""
Evaluate Q(s,a).
Args:
q_hat: Function approximator with method: q_hat.eval(state, action) -> float
env: Environment with members env.max_inv, env.max_t, env.get_actions(state)
Returns:
q_grid: np.ndarray shape with max_a Q(s,a)
greedy: pd.DataFrame with columns ['x','y','a'] for argmax_a Q(s,a)
"""
rows = []
for x in range(env.cars_max+1):
for y in range(env.cars_max+1):
s = [x, y]
for a in env.get_actions(s): # includes [scrap_price] at t==max_t
rows.append((x, y, int(a), q_hat.eval(s, a)))
dat = pd.DataFrame(rows, columns=['x', 'y', 'a', 'q_hat'])
# Greedy action per state
idx = dat.groupby(['x', 'y'])['q_hat'].idxmax()
greedy = dat.loc[idx, ['x', 'y', 'a']].reset_index(drop=True)
# Max-Q grid (q as rows, t as columns)
q_max = dat.groupby(['x', 'y'])['q_hat'].max().reset_index()
q_grid = q_max.pivot(index='x', columns='y', values='q_hat').values
return q_grid, greedy
def plot_trace(trace, dat_mdp):
"""
Plot the max-Q surface for the last snapshot in `trace`, the corresponding
actions, the visited states and the average reward.
"""
# last trace and optimal q-values
plot_q_val(trace, dat_mdp, env)
# last trace and optimal actions
dat_a = trace[-1]['a'].copy() # DataFrame with ['x','y','a']
dat_a['a'] = pd.Categorical(dat_a['a'])
pt = (
ggplot(dat_a, aes(x = "x", y = "y", color = "a"))
+ geom_point()
+ theme(figure_size=(10, 5), legend_position='bottom')
+ labs(title = "Actions")
)
pt.show()
# visited states
dat_s = [ [s[0], s[1], q_hat.s_ctr[q_hat.st2idx[s]]] for s in env.get_states()]
dat_s = pd.DataFrame(dat_s, columns=['x', 'y', 'n'])
dat_s = dat_s >> mask(X.n > 0)
pt = (
ggplot(dat_s, aes(x = 'x', y = 'y', size = 'n'))
+ geom_point(alpha = 0.5)
+ theme(figure_size=(10, 5), legend_position='bottom')
+ labs(title = "Visited states")
)
pt.show()
# average reward
step = [t['step'] for t in trace]
r = [t['bar_R'] for t in trace]
dat = pd.DataFrame({'step': step, 'ave_reward': r})
pt = (
ggplot(dat, aes(x = 'step', y = 'r'))
+ geom_line()
+ theme(figure_size=(10, 5), legend_position='bottom')
)
pt.show()
def plot_q_val(trace, dat_mdp, env, id=-1):
"""
Plot the a Q-surface of a snapshot in `trace`.
"""
fig = make_subplots(
rows=1, cols=1,
specs=[[{"type": "surface"}]],
subplot_titles=[f"Step {trace[id]['step']}"]
)
# --- Data prep ---
q_arr = np.nan_to_num(trace[id]['q'])
# tmp = dat_mdp.pivot(index='x', columns='y', values='Value').values
x = np.arange(0, env.cars_max + 1)
y = np.arange(0, env.cars_max + 1)
Y, X = np.meshgrid(y, x)
# --- Compute global z-range ---
zmin = np.nanmin([q_arr.min()])
zmax = np.nanmax([q_arr.max()])
# --- Surfaces ---
fig.add_trace(
go.Surface(x=X, y=Y, z=q_arr, colorscale="Viridis",
cmin=zmin, cmax=zmax, showscale=False),
row=1, col=1
)
# fig.add_trace(
# go.Surface(x=X, y=Y, z=tmp, colorscale="Viridis",
# cmin=zmin, cmax=zmax, showscale=True, colorbar_title="Value"),
# row=1, col=2
# )
# --- Layout with shared z-axis range ---
fig.update_layout(
width=700, height=700,
scene=dict(
xaxis_title="x",
yaxis_title="y",
zaxis_title="Value",
zaxis=dict(range=[zmin, zmax])
),
# scene2=dict(
# xaxis_title="Inventory (q)",
# yaxis_title="Time (t)",
# zaxis_title="Value",
# zaxis=dict(range=[zmin, zmax])
# ),
)
fig.show()
q_hat.reset()
trace = []
total_steps = 1000
cont_diff_expected_sarsa(
env=env,
total_steps=total_steps,
beta=0.01,
eps=0.1,
q_hat=q_hat,
s_start=(10, 10),
eps_decay=0.999995,
callback_every=50,
callback=callback,
trace=trace
)
plot_trace(trace, dat_mdp)
Q5¶
Give a short summary of all the functions and their purpose.
The callback stores the estimated best q-values and the corresponding actions, plus the average reward estimate.
Function
eval_state_action_spacereturns the best q-values and the corresponding actions.Function
plot_traceplots the max-Q surface for the last snapshot intrace, the corresponding actions, the visited states and the average reward.
Is the output reasonable?
We have only run a few steps, so the approximation is not so good but we already can see the relative function values is reasonable (higher for higher $(x,y)$).
Q6¶
- Run the algorithm using 100000 steps, with
callback_every = 5000. - Comment on your results.
- Give an interpretation of $q(s,a)$, e.g. that does it mean if $q((3,15), -4) = 10$ or $-5$?
#@title Solution
q_hat.reset()
trace = []
cont_diff_expected_sarsa(
env=env,
total_steps=100000,
beta=0.01,
eps=0.1,
q_hat=q_hat,
s_start=(10, 10),
eps_decay=0.999995,
callback_every=5000,
callback=callback,
trace=trace
)
plot_trace(trace, dat_mdp)
display(Markdown("""
The slope of the function is now as expected, but still not very smooth. The approximation is best for the states visited most.
The q-values are interpreted as the expected excess (or deficit) reward obtained by starting in state $s$, taking action $a$,
and then following the estimated best policy $\\pi$, relative to the long-run average reward. That is, $q^(s,a)$ measures
how much better or worse the current state–action pair is compared to what would be earned on average if the policy continued
indefinitely. More specific
- $q^(s,a) > 0$: taking $a$ in $s$ yields an above-average long-term return.
- $q^(s,a) < 0$: taking $a$ in $s$ yields a below-average long-term return.
For example, if $q((3,15), -4) = 10$ then moving 4 cars to Location 1 in state $(3, 15)$ yields an above-average long-term return of 10$.
"""))
Q7¶
Could another function approximation be suitable here?
Yes, the function seems very smooth, so a polynomial approximation may be a good suggstion.
Apply your suggestion to the problem.
See code below
#@title Solution
# Your code
Old stuff¶
#@title MDP solution
dat_mdp = pd.read_csv('https://drive.google.com/uc?id=18QUCVZ4QTP_3kcvc7-th_U9HWSlWiJUM', index_col = 0)
dat_mdp = pd.DataFrame(dat_mdp)
## store as array for later use when calc RMSE
v_mdp_grid = dat_mdp.pivot(index='x', columns='y', values='Value').values
v_mdp_grid.shape
vmin, vmax = np.nanmin(v_mdp_grid), np.nanmax(v_mdp_grid) # min and max value
vrange = vmax - vmin if vmax > vmin else 1.0 # range if zero set to 1 to avoid division by zero
n-step SARSA¶
To extend one-step Sarsa we may extend our forward view to $n$-steps. Here the target accumulates $n$ rewards and then bootstraps if the episode continues: $$ G_{t:t+n} = \begin{cases} \sum_{k=1}^{n} \gamma^{k-1} R_{t+k} \;+\; \gamma^{n}\,\hat q(S_{t+n},A_{t+n},w)\ \text{(if no terminal states reached)} \\ G_t \text{(otherwise)} \end{cases} $$ In terminal cases before $t{+}n$, the bootstrap term is omitted and $G_{t:t+n}$ is just the episodic return.
Out update now becomes $$ w \leftarrow w + \alpha\big[G_{\tau:\tau+n} - \hat q(S_\tau,A_\tau,w)\big]\nabla_w \hat q(S_\tau,A_\tau,w). $$
Exploration could be $\epsilon$-greedy with respect to $\hat q$, forming an on-policy loop that both explores and improves.
The choice of look-ahead steps ($n$) trades bias and variance: $n=1$ learns quickly but can be myopic; very large $n$ approaches Monte Carlo with higher variance; small-to-moderate $n$ often yields faster learning and good stability in practice.
Your turn¶
Modify the algorithm ep_semi_grad_sarsa to an n-step version.
#@title Solution - Episodic semi-gradient n-step SARSA
import numpy as np
from collections import deque
def ep_semi_grad_sarsa_n_step(env, episodes, gamma, eps, q_hat,
n=4, s_start=None, max_steps=10000,
callback=None, trace=None):
"""
Episodic semi-gradient n-step SARSA.
Args:
env: has methods reset(), get_actions(s)->list, get_step(s,a)->(s_next, r, done)
episodes (int)
gamma (float): discount
eps (float): epsilon for ε-greedy
q_hat: function approximator with eval(s,a)->float and train(s,a,target)->None
n (int): number of steps
s_start: optional start state or list of start states
max_steps (int): safety cap per episode
callback: function(ep, func_approx, trace)
trace: object passed to callback
"""
def argmax_rand(arr):
"""Pick an argmax uniformly at random among ties."""
arr = np.asarray(arr)
return np.random.choice(np.flatnonzero(arr == arr.max()))
def epsilon_greedy(s, eps):
actions = env.get_actions(s)
if not actions: # no feasible action
return None
if np.random.rand() < eps:
return np.random.choice(actions)
q_vals = [q_hat.eval(s, a) for a in actions]
return actions[argmax_rand(q_vals)]
for ep in tqdm(range(1, episodes + 1), desc="Episode", unit="episode",
bar_format='{l_bar}{bar}| {n}/{total} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'):
# start state
if s_start is None:
s0 = env.reset()
else:
s0 = np.random.choice(s_start) if isinstance(s_start, (list, tuple, np.ndarray)) else s_start
S = deque([s0], maxlen=n+1) # states buffer
A = deque([], maxlen=n+1) # actions buffer
R = deque([0.0], maxlen=n+1) # rewards buffer, R[0] is dummy to align indexing
a0 = epsilon_greedy(s0, eps)
if a0 is None:
# terminal right away (or no feasible actions)
if callback is not None:
# Pass q_hat correctly as the second argument
callback(ep, q_hat, trace)
continue
A.append(a0)
T = np.inf # time of terminal
t = 0 # real time
if callback is not None:
# Pass q_hat correctly as the second argument
callback(ep, q_hat, trace)
while True:
if t < T:
s_next, r, done = env.get_step(S[-1], A[-1])
S.append(s_next)
R.append(r)
if done:
T = t + 1
else:
a_next = epsilon_greedy(s_next, eps)
A.append(a_next)
tau = t - n + 1 # state becoming eligible for update
if tau >= 0:
# compute G for the update time tau
G = 0.0
# sum of discounted rewards from tau+1 to min(tau+n, T)
# Removed int() from around T as it causes OverflowError with np.inf
upper = min(tau + n, T)
# Corrected reward indexing: R[1] is R_{t+1} at step t, which is R_{tau+1} at step tau
for k in range(tau + 1, int(upper) + 1):
# The reward R_{k} (at time k) is R[k - tau] in the current window (starting at tau)
G += (gamma ** (k - tau - 1)) * R[k - tau]
if tau + n < T:
# bootstrapped term if not yet at terminal
s_tau_n = S[n] # S[tau + n] relative to window: last state in buffer
a_tau_n = A[n-1] # action taken from S[tau+n-1]
if a_tau_n is not None:
G += (gamma ** n) * q_hat.eval(s_tau_n, a_tau_n)
# update Q(S_tau, A_tau)
s_tau = S[0] # S[tau] is the first state in the buffer
a_tau = A[0] # A[tau] is the first action in the buffer
if a_tau is not None:
q_hat.train(s_tau, a_tau, G)
# pop left to slide window when tau advances
# (we don’t manually pop because deque with maxlen handles it as we append)
t += 1
if tau >= T - 1:
break
if t > max_steps:
# safety exit to avoid runaway episodes
break
# Example usage (keep this as is to test the function after fixing):
prices = [10, 15, 20, 25]
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=15, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed=876
)
# define function approx
q_hat = FuncApproxTiles(env, actions=prices, step_size=0.1, nb_tilings=8, nb_tiles=8, init_val=0)
# train
q_hat.reset()
trace = []
start_state = None
ep_semi_grad_sarsa_n_step(env, episodes=1000, gamma=1.0, eps=0.1, q_hat=q_hat, n=4, s_start=None, callback=callback, trace=trace)
# plot results (assuming plot_trace and callback are defined and work correctly)
start_state = [65,1] #356.573097
plot_trace(trace, start_state = start_state, n_rows=5)
Episodic semi-gradient Q-learning¶
Using Q-learning may not work since this is an off-policy algorithm
#@title Episodic semi-gradient Q-learning
import numpy as np
def ep_semi_grad_q_learning(
env,
episodes,
gamma,
eps,
q_hat,
s_start=None,
callback=None,
trace=None,
max_steps=None,
eps_decay=None # float in (0,1) or callable (ep, eps)->new_eps
):
"""
Episodic semi-gradient Q-learning (off-policy control) for RLEnvSeasonal-like env.
Expects:
env.reset() -> [q, t]
env.get_actions(s) -> list of feasible actions (s is not None)
env.get_step(s, a) -> (s_next, r, done) with s_next=None if terminal
q_hat.eval(s, a) -> float
q_hat.train(s, a, target) -> None # semi-gradient step toward 'target'
"""
def argmax_rand(values):
m = np.max(values)
idx = np.flatnonzero(values == m)
return np.random.choice(idx)
def greedy_action(s):
actions = env.get_actions(s)
qvals = np.array([q_hat.eval(s, a) for a in actions], dtype=float)
return float(actions[argmax_rand(qvals)])
def policy(s, eps_curr):
actions = env.get_actions(s)
if np.random.rand() < eps_curr:
return float(np.random.choice(actions))
return greedy_action(s)
def pick_start():
if s_start is None:
return env.reset()
try:
if isinstance(s_start, (str, bytes)):
return s_start
return list(s_start)[np.random.randint(len(s_start))]
except TypeError:
return s_start
eps_curr = float(eps)
for ep in tqdm(range(1, episodes + 1), desc="Episode", unit="episode",
bar_format='{l_bar}{bar}| {n}/{total} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'):
s = pick_start()
a = policy(s, eps_curr)
steps = 0
G = 0.0
while True:
s_n, r, done = env.get_step(s, a)
# print("s,a,s_n,r,done:",s,a,s_n,r,done)
G += r
if done:
# target = r (terminal)
q_hat.train(s, a, r)
break
else:
# Q-learning target uses greedy action at next state (off-policy)
a_star = greedy_action(s_n)
target = r + gamma * q_hat.eval(s_n, a_star)
q_hat.train(s, a, target)
s = s_n
a = policy(s, eps_curr) # behavior policy stays ε-greedy
steps += 1
if max_steps is not None and steps >= max_steps:
break
if callback is not None:
info = {"epsilon": eps_curr, "steps": steps, "G": G}
callback(ep, q_hat, trace, info)
if eps_decay is not None:
eps_curr = (eps_curr * float(eps_decay)) if not callable(eps_decay) else float(eps_decay(ep, eps_curr))
eps_curr = min(1.0, max(0.0, eps_curr))
# define candidate sale prices
prices = [10, 15, 20, 25]
# instantiate environment
env = RLEnvSeasonal(
max_inv=100, # maximum inventory
max_t=15, # selling horizon (weeks)
scrap_price=5.0, # scrap value at final week
purchase_price=14.0, # cost of each unit purchased at t=1
prices=prices, # allowable selling prices
seed=876
)
# define function approx
start_inv = 65
q_hat = FuncApproxTiles(env, actions=prices, step_size=0.4, nb_tilings=8, nb_tiles=8, init_val=0.0, deterministic=True, seed=53487)
# 25*start_inv-14*start_inv
def callback1(ep, func_approx, trace, info, eval_every=2000):
callback(ep, func_approx, trace, info, eval_every)
# train
episodes = 15000
start_eps = 0.3
end_eps = 0.00000000001
q_hat.reset()
trace = []
ep_semi_grad_q_learning(
env, episodes=episodes, gamma=1.0, eps=start_eps, q_hat=q_hat,
s_start=[start_state],
callback=callback1, trace=trace,
max_steps=env.max_t, # harmless safety
eps_decay=0.9995
# eps_decay=decay_factor(end_eps, start_eps, episodes)
)
# plot results
plot_trace(trace, dat_mdp)
Control with Function Approximation when Only Some Actions are Feasible¶
In many reinforcement learning problems, not all actions are valid in every state. For example, a robot cannot move forward if blocked by a wall, or a game agent may only have certain moves available depending on the situation. When using function approximation for action-value functions, this poses a difficulty because the approximator (e.g., neural network or linear model) typically defines $\hat q(s,a,\mathbf{w})$ for all possible $a$, regardless of feasibility.
Representing Feasible Actions¶
To handle feasibility constraints, the algorithm must ensure that learning and decision-making only consider valid actions in each state. This is usually achieved by explicitly defining a feasibility mask or action set:
$$ \mathcal{A}(s) = \{ a \in \mathcal{A} \mid a \text{ is allowed in state } s \}. $$
During both learning and control:
- Only actions $a \in \mathcal{A}(s)$ are used when computing greedy or exploratory choices.
- Updates to the value function are made only for these actions.
Formally, the control policy becomes:
$$ \pi(s) = \arg\max_{a \in \mathcal{A}(s)} \hat q(s,a,\mathbf{w}). $$
This restriction ensures that infeasible actions are never selected or used to propagate errors.
Handling Infeasible Actions in Function Approximation¶
If the function approximator produces outputs for all actions (e.g., a neural network with one output per action), the infeasible actions’ values must be prevented from influencing updates. Common strategies include:
Masking Outputs
During both forward and backward passes, the outputs corresponding to infeasible actions are masked out. The loss and gradient updates ignore these entries, ensuring that only valid actions contribute to parameter updates.Action-dependent Features
When using linear function approximation $\hat q(s,a,\mathbf{w}) = \mathbf{w}^T \mathbf{x}(s,a)$, the feature vector $\mathbf{x}(s,a)$ can be designed to be zero for infeasible actions. Then no update occurs for them, and their estimated value remains irrelevant.Constrained Sampling
In algorithms like Q-learning or actor–critic, only feasible actions are sampled when forming targets such as$$ R_{t+1} + \gamma \max_{a' \in \mathcal{A}(S_{t+1})} \hat q(S_{t+1},a',\mathbf{w}). $$
This prevents bootstrapping from illegal choices.
Actor–Critic Formulation¶
In actor–critic methods, the actor represents a parameterized policy $\pi(a|s,\theta)$. When only some actions are feasible, the policy distribution is renormalized over $\mathcal{A}(s)$:
$$ \pi(a|s,\theta) = \begin{cases} \frac{e^{h(s,a,\theta)}}{\sum_{a' \in \mathcal{A}(s)} e^{h(s,a',\theta)}}, & a \in \mathcal{A}(s), \\ 0, & \text{otherwise.} \end{cases} $$
This guarantees that the policy assigns zero probability to invalid actions while preserving smoothness and differentiability for feasible ones.
Practical Considerations¶
- When the set of feasible actions changes frequently or depends on continuous variables (e.g., physical constraints), it can be embedded directly in the environment’s step function or transition model.
- For neural approximators, masking must be applied consistently in both the forward (value computation) and backward (gradient update) passes.
- Exploration strategies such as $\epsilon$-greedy or softmax sampling must also respect the feasible action set.
Summary¶
Control with function approximation under action constraints follows the same learning principles as the unconstrained case, but all selection, evaluation, and update operations are restricted to the feasible subset of actions at each state. This ensures that:
- the policy never proposes invalid actions,
- the approximator only learns about meaningful $(s,a)$ pairs,
- and bootstrapped targets remain consistent with environment dynamics.
Formally:
$$ \forall s, \quad \text{updates and decisions involve only } a \in \mathcal{A}(s). $$